使用ZeroMq源测试Kuiper吞吐量 您所在的位置:网站首页 go zeromq 使用ZeroMq源测试Kuiper吞吐量

使用ZeroMq源测试Kuiper吞吐量

2023-04-13 17:39| 来源: 网络整理| 查看: 265

EMQ X Kuiper支持各种source(MQTT、ZeroMq、EdgeX),默认支持MQTT 1、关于Kuiper的介绍及简单使用,请参考:https://docs.emqx.io/kuiper/latest/cn/ 2、关于Kuiper与EdgeX Foundry集成,请参照:https://www.jianshu.com/p/0726d41b00bf

本文实践ZeroMq作为消息总线源(实际上EdgeX内部使用的正是ZeroMq),Kuiper订阅其消息,并以此测试Kuiper的性能,kuiper官方给出性能测试结果:https://docs.emqx.io/kuiper/latest/cn/

操作系统: Ubuntu 18.04 测试原理描述:启动一个go应用向ZeroMq发送消息,同时Kuiper订阅来自ZeroMq的消息,经过Kuiper规则处理后输出消息 操作步骤: 1、创建一个go应用,向ZeroMq发送若干条消息,每条消息类似: {"device":"demo","readings":[{"device":"Temperature device","name":"Temperature","value":"40"},{"device":"Humidity device","name":"Humidity","value":"45"}]},大小大概为157字节。

go应用源码:https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/benchmark/pub.go go应用与kuiper在一台机器上 2、由于Kuiper对zmq源的支持是以插件的形式支持的,而且必须与Kuiper的版本相配套,故此处kuiper采用源码编译打包,同时编译zmq的插件

zj@zj-Z390-UD:~/文档/项目工作$ git clone https://github.com/emqx/kuiper.git 正克隆到 'kuiper'... remote: Enumerating objects: 28, done. remote: Counting objects: 100% (28/28), done. remote: Compressing objects: 100% (19/19), done. remote: Total 5326 (delta 10), reused 21 (delta 9), pack-reused 5298 接收对象中: 100% (5326/5326), 17.02 MiB | 597.00 KiB/s, 完成. 处理 delta 中: 100% (3332/3332), 完成. zj@zj-Z390-UD:~/文档/项目工作$ cd kuiper/ zj@zj-Z390-UD:~/文档/项目工作/kuiper$ make pkg Build successfully make[1]: 进入目录“/home/zj/文档/项目工作/kuiper” Package build success make[1]: 离开目录“/home/zj/文档/项目工作/kuiper” zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h _packages/ 总用量 18M drwxr-xr-x 2 zj zj 4.0K 6月 8 10:51 ./ drwxr-xr-x 15 zj zj 4.0K 6月 8 10:51 ../ -rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.tar.gz -rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.zip zj@zj-Z390-UD:~/文档/项目工作/kuiper$ go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h plugins/sources 总用量 5.0M drwxr-xr-x 2 zj zj 4.0K 6月 8 10:54 ./ drwxr-xr-x 6 zj zj 4.0K 6月 8 10:50 ../ -rw-r--r-- 1 zj zj 1.5K 6月 8 10:50 random.go -rw-r--r-- 1 zj zj 2.1K 6月 8 10:50 zmq.go -rw-r--r-- 1 zj zj 5.0M 6月 8 10:54 Zmq.so zj@zj-Z390-UD:~/文档/项目工作/kuiper$ cd _packages/ zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ tar zxf kuiper-0.4.1-linux-x86_64.tar.gz zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ cd kuiper-0.4.1-linux-x86_64/ zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cp ../../plugins/sources/Zmq.so plugins/sources/ zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ll plugins/sources/ 总用量 5080 drwxr-xr-x 2 zj zj 4096 6月 8 10:57 ./ drwxr-xr-x 5 zj zj 4096 6月 8 10:51 ../ -rw-r--r-- 1 zj zj 5191448 6月 8 10:57 Zmq.so

3、修改source配置

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat etc/sources/zmq.yaml #Global Zmq configurations default: server: tcp://10.0.105.143:5563 topic: events

10.0.105.143是我的go应用所在的主机IP

4、启动Kuiper server进程

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ./bin/server Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081.

5、设置Kuiper订阅的数据流及过滤规则

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create stream demo '() WITH (FORMAT="JSON", TYPE="zmq", DATASOURCE="events")' Connecting to 127.0.0.1:20498... Stream demo is created. zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ vi rule.txt zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat rule.txt { "sql": "SELECT * from demo", "actions": [ { "log": { "concurrency": 50, "bufferLength": 10240, "cacheLength": 102400, "runAsync": true } } ], "options": { "concurrency": 30, "bufferLength": 10240 } } zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create rule rule1 -f rule.txt Connecting to 127.0.0.1:20498... Creating a new rule from file rule.txt. Rule rule1 was created successfully, please use 'bin/cli getstatus rule rule1' command to get rule status. zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat log/stream.log time="2020-06-08T11:00:26+08:00" level=info msg="db location is /home/zj/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64/data/" file="server.go:31" time="2020-06-08T11:00:26+08:00" level=info msg="Starting rules" file="server.go:48" time="2020-06-08T11:00:26+08:00" level=info msg="Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081. \n" file="server.go:101" time="2020-06-08T11:02:36+08:00" level=info msg="Stream demo is created." file="xsql_processor.go:75" time="2020-06-08T11:05:52+08:00" level=info msg="Rule rule1 is created." file="xsql_processor.go:226" time="2020-06-08T11:05:52+08:00" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 30, bufferLength: 10240" file="xsql_processor.go:399" time="2020-06-08T11:05:52+08:00" level=info msg="Opening stream" file="streams.go:89" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="open source node demo with option map[DATASOURCE:events FORMAT:JSON TYPE:zmq]" file="source_node.go:59" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="open source node 1 instances" file="source_node.go:78" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="open sink node 50 instances" file="sink_node.go:143" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1 time="2020-06-08T11:05:52+08:00" level=info msg="Start source demo instance 0 successfully" file="source_node.go:115" rule=rule1

Kuiper规则配置如下:

{ "sql": "SELECT * from demo", "actions": [ { "log": { "concurrency": 50, "bufferLength": 10240, "cacheLength": 102400, "runAsync": true } } ], "options": { "concurrency": 30, "bufferLength": 10240 } }

以上参数含义:https://github.com/emqx/kuiper/blob/master/docs/zh_CN/rules/overview.md

6、go应用发送消息耗时:0.344352s

image.png

同时kuiper侧订阅zeroMq主题为events的消息,过滤了所有消息

image.png

说明,kuiper在0.344352s处理了9676条数据

经测试:

image.png

注:测试发现当数据量大时,明显丢包严重



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有